RSpec.shared_examples 'subscriptions are not supported' do
  describe '#supports_subscriptions?' do
    it 'returns false' do
      expect(subject.supports_subscriptions?).to eq(false)
    end
  end

  describe '#subscribe' do
    it 'raises an error' do
      destination = double('destination')
      consumer = ->(_) {}
      expect do
        subject.subscribe(destination, &consumer)
      end.to raise_error "#subscribe is not supported by #{subject.adapter.class}"
    end
  end
end

RSpec.shared_examples 'subscriptions are supported' do |subscription_type|
  describe '#supports_subscriptions?' do
    it 'returns true' do
      expect(subject.supports_subscriptions?).to eq(true)
    end
  end

  let(:destination) { adapter_context.create_destination('subscriptions_example_queue') }

  let(:message1) { 'message 1' }
  let(:message2) { 'message 2' }
  let(:messages) { [] }
  let(:consumer) do
    lambda do |msg|
      messages << msg
    end
  end

  describe '#subscribe' do
    before do
      destination.purge if destination.respond_to? :purge
    end

    let(:subscription) { adapter_context.subscribe(destination, &consumer) }
    after do
      subscription.unsubscribe
    end

    it 'returns a MessageDriver::Subscription::Base' do
      expect(subscription).to be_a MessageDriver::Subscription::Base
    end

    context 'the subscription' do
      subject { subscription }

      it { is_expected.to be_a MessageDriver::Subscription::Base }
      it { is_expected.to be_a subscription_type }

      describe '#adapter' do
        it { expect(subject.adapter).to be adapter }
      end

      describe '#destination' do
        it { expect(subject.destination).to be destination }
      end

      describe '#consumer' do
        it { expect(subject.consumer).to be consumer }
      end

      describe '#unsubscribe' do
        it "makes it so messages don't go to the consumer any more" do
          subscription.unsubscribe
          expect do
            destination.publish('should not be consumed')
          end.to_not change { messages.size }
        end
      end
    end

    context 'when there are already messages in the destination' do
      before do
        destination.publish(message1)
        destination.publish(message2)
      end

      it 'plays the messages into the consumer' do
        expect do
          subscription
          pause_if_needed
        end.to change { messages.size }.from(0).to(2)
        bodies = messages.map(&:body)
        expect(bodies).to include(message1)
        expect(bodies).to include(message2)
      end

      it 'removes the messages from the queue' do
        pause_if_needed
        expect do
          subscription
          pause_if_needed
        end.to change { destination.message_count }.from(2).to(0)
      end
    end

    context 'when a message is published to the destination' do
      before do
        subscription
      end

      it 'consumers the message into the consumer instead of putting them on the queue' do
        expect do
          expect do
            subject.publish(destination, message1)
            pause_if_needed
          end.to change { messages.length }.from(0).to(1)
        end.to_not change { destination.message_count }
        expect(messages[0].body).to eq(message1)
      end
    end

    context 'when the consumer raises an error' do
      let(:error) { RuntimeError.new('oh nos!') }
      let(:consumer) do
        lambda do |_|
          fail error
        end
      end

      before do
        destination.publish(message1)
        destination.publish(message2)
      end

      it 'keeps processing the messages' do
        pause_if_needed
        expect do
          subscription
          pause_if_needed
        end.to change { destination.message_count }.from(2).to(0)
      end

      context 'an error_handler is provided' do
        let(:error_handler) { double(:error_handler, call: nil) }
        let(:subscription) { adapter_context.subscribe(destination, error_handler: error_handler, &consumer) }

        it 'passes the errors and the messages to the error handler' do
          subscription
          pause_if_needed
          expect(error_handler).to have_received(:call).with(error, kind_of(MessageDriver::Message::Base)).at_least(2).times
        end
      end
    end
  end
end
